home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / test / test_socket.pyc (.txt) < prev    next >
Python Compiled Bytecode  |  2005-10-18  |  32KB  |  958 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.4)
  3.  
  4. import unittest
  5. from test import test_support
  6. import socket
  7. import select
  8. import time
  9. import thread
  10. import threading
  11. import Queue
  12. import sys
  13. from weakref import proxy
  14. PORT = 50007
  15. HOST = 'localhost'
  16. MSG = 'Michael Gilfix was here\n'
  17.  
  18. class SocketTCPTest(unittest.TestCase):
  19.     
  20.     def setUp(self):
  21.         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  22.         self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  23.         self.serv.bind((HOST, PORT))
  24.         self.serv.listen(1)
  25.  
  26.     
  27.     def tearDown(self):
  28.         self.serv.close()
  29.         self.serv = None
  30.  
  31.  
  32.  
  33. class SocketUDPTest(unittest.TestCase):
  34.     
  35.     def setUp(self):
  36.         self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  37.         self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  38.         self.serv.bind((HOST, PORT))
  39.  
  40.     
  41.     def tearDown(self):
  42.         self.serv.close()
  43.         self.serv = None
  44.  
  45.  
  46.  
  47. class ThreadableTest:
  48.     """Threadable Test class
  49.  
  50.     The ThreadableTest class makes it easy to create a threaded
  51.     client/server pair from an existing unit test. To create a
  52.     new threaded class from an existing unit test, use multiple
  53.     inheritance:
  54.  
  55.         class NewClass (OldClass, ThreadableTest):
  56.             pass
  57.  
  58.     This class defines two new fixture functions with obvious
  59.     purposes for overriding:
  60.  
  61.         clientSetUp ()
  62.         clientTearDown ()
  63.  
  64.     Any new test functions within the class must then define
  65.     tests in pairs, where the test name is preceeded with a
  66.     '_' to indicate the client portion of the test. Ex:
  67.  
  68.         def testFoo(self):
  69.             # Server portion
  70.  
  71.         def _testFoo(self):
  72.             # Client portion
  73.  
  74.     Any exceptions raised by the clients during their tests
  75.     are caught and transferred to the main thread to alert
  76.     the testing framework.
  77.  
  78.     Note, the server setup function cannot call any blocking
  79.     functions that rely on the client thread during setup,
  80.     unless serverExplicityReady() is called just before
  81.     the blocking call (such as in setting up a client/server
  82.     connection and performing the accept() in setUp().
  83.     """
  84.     
  85.     def __init__(self):
  86.         self._ThreadableTest__setUp = self.setUp
  87.         self._ThreadableTest__tearDown = self.tearDown
  88.         self.setUp = self._setUp
  89.         self.tearDown = self._tearDown
  90.  
  91.     
  92.     def serverExplicitReady(self):
  93.         '''This method allows the server to explicitly indicate that
  94.         it wants the client thread to proceed. This is useful if the
  95.         server is about to execute a blocking routine that is
  96.         dependent upon the client thread during its setup routine.'''
  97.         self.server_ready.set()
  98.  
  99.     
  100.     def _setUp(self):
  101.         self.server_ready = threading.Event()
  102.         self.client_ready = threading.Event()
  103.         self.done = threading.Event()
  104.         self.queue = Queue.Queue(1)
  105.         methodname = self.id()
  106.         i = methodname.rfind('.')
  107.         methodname = methodname[i + 1:]
  108.         test_method = getattr(self, '_' + methodname)
  109.         self.client_thread = thread.start_new_thread(self.clientRun, (test_method,))
  110.         self._ThreadableTest__setUp()
  111.         if not self.server_ready.isSet():
  112.             self.server_ready.set()
  113.         
  114.         self.client_ready.wait()
  115.  
  116.     
  117.     def _tearDown(self):
  118.         self._ThreadableTest__tearDown()
  119.         self.done.wait()
  120.         if not self.queue.empty():
  121.             msg = self.queue.get()
  122.             self.fail(msg)
  123.         
  124.  
  125.     
  126.     def clientRun(self, test_func):
  127.         self.server_ready.wait()
  128.         self.client_ready.set()
  129.         self.clientSetUp()
  130.         if not callable(test_func):
  131.             raise TypeError, 'test_func must be a callable function'
  132.         
  133.         
  134.         try:
  135.             test_func()
  136.         except Exception:
  137.             strerror = None
  138.             self.queue.put(strerror)
  139.  
  140.         self.clientTearDown()
  141.  
  142.     
  143.     def clientSetUp(self):
  144.         raise NotImplementedError, 'clientSetUp must be implemented.'
  145.  
  146.     
  147.     def clientTearDown(self):
  148.         self.done.set()
  149.         thread.exit()
  150.  
  151.  
  152.  
  153. class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
  154.     
  155.     def __init__(self, methodName = 'runTest'):
  156.         SocketTCPTest.__init__(self, methodName = methodName)
  157.         ThreadableTest.__init__(self)
  158.  
  159.     
  160.     def clientSetUp(self):
  161.         self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  162.  
  163.     
  164.     def clientTearDown(self):
  165.         self.cli.close()
  166.         self.cli = None
  167.         ThreadableTest.clientTearDown(self)
  168.  
  169.  
  170.  
  171. class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
  172.     
  173.     def __init__(self, methodName = 'runTest'):
  174.         SocketUDPTest.__init__(self, methodName = methodName)
  175.         ThreadableTest.__init__(self)
  176.  
  177.     
  178.     def clientSetUp(self):
  179.         self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  180.  
  181.  
  182.  
  183. class SocketConnectedTest(ThreadedTCPSocketTest):
  184.     
  185.     def __init__(self, methodName = 'runTest'):
  186.         ThreadedTCPSocketTest.__init__(self, methodName = methodName)
  187.  
  188.     
  189.     def setUp(self):
  190.         ThreadedTCPSocketTest.setUp(self)
  191.         self.serverExplicitReady()
  192.         (conn, addr) = self.serv.accept()
  193.         self.cli_conn = conn
  194.  
  195.     
  196.     def tearDown(self):
  197.         self.cli_conn.close()
  198.         self.cli_conn = None
  199.         ThreadedTCPSocketTest.tearDown(self)
  200.  
  201.     
  202.     def clientSetUp(self):
  203.         ThreadedTCPSocketTest.clientSetUp(self)
  204.         self.cli.connect((HOST, PORT))
  205.         self.serv_conn = self.cli
  206.  
  207.     
  208.     def clientTearDown(self):
  209.         self.serv_conn.close()
  210.         self.serv_conn = None
  211.         ThreadedTCPSocketTest.clientTearDown(self)
  212.  
  213.  
  214.  
  215. class SocketPairTest(unittest.TestCase, ThreadableTest):
  216.     
  217.     def __init__(self, methodName = 'runTest'):
  218.         unittest.TestCase.__init__(self, methodName = methodName)
  219.         ThreadableTest.__init__(self)
  220.  
  221.     
  222.     def setUp(self):
  223.         (self.serv, self.cli) = socket.socketpair()
  224.  
  225.     
  226.     def tearDown(self):
  227.         self.serv.close()
  228.         self.serv = None
  229.  
  230.     
  231.     def clientSetUp(self):
  232.         pass
  233.  
  234.     
  235.     def clientTearDown(self):
  236.         self.cli.close()
  237.         self.cli = None
  238.         ThreadableTest.clientTearDown(self)
  239.  
  240.  
  241.  
  242. class GeneralModuleTests(unittest.TestCase):
  243.     
  244.     def test_weakref(self):
  245.         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  246.         p = proxy(s)
  247.         self.assertEqual(p.fileno(), s.fileno())
  248.         s.close()
  249.         s = None
  250.         
  251.         try:
  252.             p.fileno()
  253.         except ReferenceError:
  254.             pass
  255.  
  256.         self.fail('Socket proxy still exists')
  257.  
  258.     
  259.     def testSocketError(self):
  260.         
  261.         def raise_error(*args, **kwargs):
  262.             raise socket.error
  263.  
  264.         
  265.         def raise_herror(*args, **kwargs):
  266.             raise socket.herror
  267.  
  268.         
  269.         def raise_gaierror(*args, **kwargs):
  270.             raise socket.gaierror
  271.  
  272.         self.failUnlessRaises(socket.error, raise_error, 'Error raising socket exception.')
  273.         self.failUnlessRaises(socket.error, raise_herror, 'Error raising socket exception.')
  274.         self.failUnlessRaises(socket.error, raise_gaierror, 'Error raising socket exception.')
  275.  
  276.     
  277.     def testCrucialConstants(self):
  278.         socket.AF_INET
  279.         socket.SOCK_STREAM
  280.         socket.SOCK_DGRAM
  281.         socket.SOCK_RAW
  282.         socket.SOCK_RDM
  283.         socket.SOCK_SEQPACKET
  284.         socket.SOL_SOCKET
  285.         socket.SO_REUSEADDR
  286.  
  287.     
  288.     def testHostnameRes(self):
  289.         hostname = socket.gethostname()
  290.         
  291.         try:
  292.             ip = socket.gethostbyname(hostname)
  293.         except socket.error:
  294.             return None
  295.  
  296.         self.assert_(ip.find('.') >= 0, 'Error resolving host to ip.')
  297.         
  298.         try:
  299.             (hname, aliases, ipaddrs) = socket.gethostbyaddr(ip)
  300.         except socket.error:
  301.             return None
  302.  
  303.         all_host_names = [
  304.             hostname,
  305.             hname] + aliases
  306.         fqhn = socket.getfqdn()
  307.         if fqhn not in all_host_names:
  308.             self.fail('Error testing host resolution mechanisms.')
  309.         
  310.  
  311.     
  312.     def testRefCountGetNameInfo(self):
  313.         import sys as sys
  314.         if hasattr(sys, 'getrefcount'):
  315.             
  316.             try:
  317.                 orig = sys.getrefcount(__name__)
  318.                 socket.getnameinfo(__name__, 0)
  319.             except SystemError:
  320.                 if sys.getrefcount(__name__) != orig:
  321.                     self.fail('socket.getnameinfo loses a reference')
  322.                 
  323.             except:
  324.                 sys.getrefcount(__name__) != orig
  325.             
  326.  
  327.         None<EXCEPTION MATCH>SystemError
  328.  
  329.     
  330.     def testInterpreterCrash(self):
  331.         
  332.         try:
  333.             socket.getnameinfo(('x', 0, 0, 0), 0)
  334.         except socket.error:
  335.             pass
  336.  
  337.  
  338.     
  339.     def testNtoH(self):
  340.         sizes = {
  341.             socket.htonl: 32,
  342.             socket.ntohl: 32,
  343.             socket.htons: 16,
  344.             socket.ntohs: 16 }
  345.         for func, size in sizes.items():
  346.             mask = (0x1L << size) - 1
  347.             for i in (0, 1, 65535, -65536, 2, 19088743, 1985229328):
  348.                 self.assertEqual(i & mask, func(func(i & mask)) & mask)
  349.             
  350.             swapped = func(mask)
  351.             self.assertEqual(swapped & mask, mask)
  352.             self.assertRaises(OverflowError, func, 0x1L << 34)
  353.         
  354.  
  355.     
  356.     def testGetServBy(self):
  357.         eq = self.assertEqual
  358.         if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6', 'darwin'):
  359.             services = ('daytime', 'qotd', 'domain')
  360.         else:
  361.             services = ('echo', 'daytime', 'domain')
  362.         for service in services:
  363.             
  364.             try:
  365.                 port = socket.getservbyname(service, 'tcp')
  366.             continue
  367.             except socket.error:
  368.                 continue
  369.             
  370.  
  371.         else:
  372.             raise socket.error
  373.         port2 = socket.getservbyname(service)
  374.         eq(port, port2)
  375.         
  376.         try:
  377.             udpport = socket.getservbyname(service, 'udp')
  378.         except socket.error:
  379.             None<EXCEPTION MATCH>socket.error
  380.             None<EXCEPTION MATCH>socket.error
  381.             udpport = None
  382.         except:
  383.             None<EXCEPTION MATCH>socket.error
  384.  
  385.         eq(udpport, port)
  386.         eq(socket.getservbyport(port2), service)
  387.         eq(socket.getservbyport(port, 'tcp'), service)
  388.         if udpport is not None:
  389.             eq(socket.getservbyport(udpport, 'udp'), service)
  390.         
  391.  
  392.     
  393.     def testDefaultTimeout(self):
  394.         self.assertEqual(socket.getdefaulttimeout(), None)
  395.         s = socket.socket()
  396.         self.assertEqual(s.gettimeout(), None)
  397.         s.close()
  398.         socket.setdefaulttimeout(10)
  399.         self.assertEqual(socket.getdefaulttimeout(), 10)
  400.         s = socket.socket()
  401.         self.assertEqual(s.gettimeout(), 10)
  402.         s.close()
  403.         socket.setdefaulttimeout(None)
  404.         self.assertEqual(socket.getdefaulttimeout(), None)
  405.         s = socket.socket()
  406.         self.assertEqual(s.gettimeout(), None)
  407.         s.close()
  408.         self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
  409.         self.assertRaises(TypeError, socket.setdefaulttimeout, 'spam')
  410.  
  411.     
  412.     def testIPv4toString(self):
  413.         if not hasattr(socket, 'inet_pton'):
  414.             return None
  415.         
  416.         f = inet_aton
  417.         inet_pton = inet_pton
  418.         AF_INET = AF_INET
  419.         import socket
  420.         
  421.         g = lambda a: inet_pton(AF_INET, a)
  422.         self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
  423.         self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
  424.         self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
  425.         self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
  426.         self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
  427.         self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
  428.         self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
  429.         self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
  430.         self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
  431.  
  432.     
  433.     def testIPv6toString(self):
  434.         if not hasattr(socket, 'inet_pton'):
  435.             return None
  436.         
  437.         
  438.         try:
  439.             inet_pton = inet_pton
  440.             AF_INET6 = AF_INET6
  441.             has_ipv6 = has_ipv6
  442.             import socket
  443.             if not has_ipv6:
  444.                 return None
  445.         except ImportError:
  446.             return None
  447.  
  448.         
  449.         f = lambda a: inet_pton(AF_INET6, a)
  450.         self.assertEquals('\x00' * 16, f('::'))
  451.         self.assertEquals('\x00' * 16, f('0::0'))
  452.         self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
  453.         self.assertEquals('E\xefv\xcb\x00\x1aV\xef\xaf\xeb\x0b\xac\x19$\xae\xae', f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae'))
  454.  
  455.     
  456.     def testStringToIPv4(self):
  457.         if not hasattr(socket, 'inet_ntop'):
  458.             return None
  459.         
  460.         f = inet_ntoa
  461.         inet_ntop = inet_ntop
  462.         AF_INET = AF_INET
  463.         import socket
  464.         
  465.         g = lambda a: inet_ntop(AF_INET, a)
  466.         self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
  467.         self.assertEquals('170.85.170.85', f('\xaaU\xaaU'))
  468.         self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
  469.         self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
  470.         self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
  471.         self.assertEquals('170.85.170.85', g('\xaaU\xaaU'))
  472.         self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
  473.  
  474.     
  475.     def testStringToIPv6(self):
  476.         if not hasattr(socket, 'inet_ntop'):
  477.             return None
  478.         
  479.         
  480.         try:
  481.             inet_ntop = inet_ntop
  482.             AF_INET6 = AF_INET6
  483.             has_ipv6 = has_ipv6
  484.             import socket
  485.             if not has_ipv6:
  486.                 return None
  487.         except ImportError:
  488.             return None
  489.  
  490.         
  491.         f = lambda a: inet_ntop(AF_INET6, a)
  492.         self.assertEquals('::', f('\x00' * 16))
  493.         self.assertEquals('::1', f('\x00' * 15 + '\x01'))
  494.         self.assertEquals('aef:b01:506:1001:ffff:9997:55:170', f('\n\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00U\x01p'))
  495.  
  496.     
  497.     def testSockName(self):
  498.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  499.         sock.bind(('0.0.0.0', PORT + 1))
  500.         name = sock.getsockname()
  501.         self.assertEqual(name, ('0.0.0.0', PORT + 1))
  502.  
  503.     
  504.     def testGetSockOpt(self):
  505.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  506.         reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
  507.         self.failIf(reuse != 0, 'initial mode is reuse')
  508.  
  509.     
  510.     def testSetSockOpt(self):
  511.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  512.         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  513.         reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
  514.         self.failIf(reuse == 0, 'failed to set reuse mode')
  515.  
  516.     
  517.     def testSendAfterClose(self):
  518.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  519.         sock.settimeout(1)
  520.         sock.close()
  521.         self.assertRaises(socket.error, sock.send, 'spam')
  522.  
  523.  
  524.  
  525. class BasicTCPTest(SocketConnectedTest):
  526.     
  527.     def __init__(self, methodName = 'runTest'):
  528.         SocketConnectedTest.__init__(self, methodName = methodName)
  529.  
  530.     
  531.     def testRecv(self):
  532.         msg = self.cli_conn.recv(1024)
  533.         self.assertEqual(msg, MSG)
  534.  
  535.     
  536.     def _testRecv(self):
  537.         self.serv_conn.send(MSG)
  538.  
  539.     
  540.     def testOverFlowRecv(self):
  541.         seg1 = self.cli_conn.recv(len(MSG) - 3)
  542.         seg2 = self.cli_conn.recv(1024)
  543.         msg = seg1 + seg2
  544.         self.assertEqual(msg, MSG)
  545.  
  546.     
  547.     def _testOverFlowRecv(self):
  548.         self.serv_conn.send(MSG)
  549.  
  550.     
  551.     def testRecvFrom(self):
  552.         (msg, addr) = self.cli_conn.recvfrom(1024)
  553.         self.assertEqual(msg, MSG)
  554.  
  555.     
  556.     def _testRecvFrom(self):
  557.         self.serv_conn.send(MSG)
  558.  
  559.     
  560.     def testOverFlowRecvFrom(self):
  561.         (seg1, addr) = self.cli_conn.recvfrom(len(MSG) - 3)
  562.         (seg2, addr) = self.cli_conn.recvfrom(1024)
  563.         msg = seg1 + seg2
  564.         self.assertEqual(msg, MSG)
  565.  
  566.     
  567.     def _testOverFlowRecvFrom(self):
  568.         self.serv_conn.send(MSG)
  569.  
  570.     
  571.     def testSendAll(self):
  572.         msg = ''
  573.         while None:
  574.             read = self.cli_conn.recv(1024)
  575.             if not read:
  576.                 break
  577.             
  578.             msg += read
  579.         self.assertEqual(msg, 'f' * 2048)
  580.  
  581.     
  582.     def _testSendAll(self):
  583.         big_chunk = 'f' * 2048
  584.         self.serv_conn.sendall(big_chunk)
  585.  
  586.     
  587.     def testFromFd(self):
  588.         if not hasattr(socket, 'fromfd'):
  589.             return None
  590.         
  591.         fd = self.cli_conn.fileno()
  592.         sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
  593.         msg = sock.recv(1024)
  594.         self.assertEqual(msg, MSG)
  595.  
  596.     
  597.     def _testFromFd(self):
  598.         self.serv_conn.send(MSG)
  599.  
  600.     
  601.     def testShutdown(self):
  602.         msg = self.cli_conn.recv(1024)
  603.         self.assertEqual(msg, MSG)
  604.  
  605.     
  606.     def _testShutdown(self):
  607.         self.serv_conn.send(MSG)
  608.         self.serv_conn.shutdown(2)
  609.  
  610.  
  611.  
  612. class BasicUDPTest(ThreadedUDPSocketTest):
  613.     
  614.     def __init__(self, methodName = 'runTest'):
  615.         ThreadedUDPSocketTest.__init__(self, methodName = methodName)
  616.  
  617.     
  618.     def testSendtoAndRecv(self):
  619.         msg = self.serv.recv(len(MSG))
  620.         self.assertEqual(msg, MSG)
  621.  
  622.     
  623.     def _testSendtoAndRecv(self):
  624.         self.cli.sendto(MSG, 0, (HOST, PORT))
  625.  
  626.     
  627.     def testRecvFrom(self):
  628.         (msg, addr) = self.serv.recvfrom(len(MSG))
  629.         self.assertEqual(msg, MSG)
  630.  
  631.     
  632.     def _testRecvFrom(self):
  633.         self.cli.sendto(MSG, 0, (HOST, PORT))
  634.  
  635.  
  636.  
  637. class BasicSocketPairTest(SocketPairTest):
  638.     
  639.     def __init__(self, methodName = 'runTest'):
  640.         SocketPairTest.__init__(self, methodName = methodName)
  641.  
  642.     
  643.     def testRecv(self):
  644.         msg = self.serv.recv(1024)
  645.         self.assertEqual(msg, MSG)
  646.  
  647.     
  648.     def _testRecv(self):
  649.         self.cli.send(MSG)
  650.  
  651.     
  652.     def testSend(self):
  653.         self.serv.send(MSG)
  654.  
  655.     
  656.     def _testSend(self):
  657.         msg = self.cli.recv(1024)
  658.         self.assertEqual(msg, MSG)
  659.  
  660.  
  661.  
  662. class NonBlockingTCPTests(ThreadedTCPSocketTest):
  663.     
  664.     def __init__(self, methodName = 'runTest'):
  665.         ThreadedTCPSocketTest.__init__(self, methodName = methodName)
  666.  
  667.     
  668.     def testSetBlocking(self):
  669.         self.serv.setblocking(0)
  670.         start = time.time()
  671.         
  672.         try:
  673.             self.serv.accept()
  674.         except socket.error:
  675.             pass
  676.  
  677.         end = time.time()
  678.         self.assert_(end - start < 1.0, 'Error setting non-blocking mode.')
  679.  
  680.     
  681.     def _testSetBlocking(self):
  682.         pass
  683.  
  684.     
  685.     def testAccept(self):
  686.         self.serv.setblocking(0)
  687.         
  688.         try:
  689.             (conn, addr) = self.serv.accept()
  690.         except socket.error:
  691.             pass
  692.  
  693.         self.fail('Error trying to do non-blocking accept.')
  694.         (read, write, err) = select.select([
  695.             self.serv], [], [])
  696.         if self.serv in read:
  697.             (conn, addr) = self.serv.accept()
  698.         else:
  699.             self.fail('Error trying to do accept after select.')
  700.  
  701.     
  702.     def _testAccept(self):
  703.         time.sleep(0.10000000000000001)
  704.         self.cli.connect((HOST, PORT))
  705.  
  706.     
  707.     def testConnect(self):
  708.         (conn, addr) = self.serv.accept()
  709.  
  710.     
  711.     def _testConnect(self):
  712.         self.cli.settimeout(10)
  713.         self.cli.connect((HOST, PORT))
  714.  
  715.     
  716.     def testRecv(self):
  717.         (conn, addr) = self.serv.accept()
  718.         conn.setblocking(0)
  719.         
  720.         try:
  721.             msg = conn.recv(len(MSG))
  722.         except socket.error:
  723.             pass
  724.  
  725.         self.fail('Error trying to do non-blocking recv.')
  726.         (read, write, err) = select.select([
  727.             conn], [], [])
  728.         if conn in read:
  729.             msg = conn.recv(len(MSG))
  730.             self.assertEqual(msg, MSG)
  731.         else:
  732.             self.fail('Error during select call to non-blocking socket.')
  733.  
  734.     
  735.     def _testRecv(self):
  736.         self.cli.connect((HOST, PORT))
  737.         time.sleep(0.10000000000000001)
  738.         self.cli.send(MSG)
  739.  
  740.  
  741.  
  742. class FileObjectClassTestCase(SocketConnectedTest):
  743.     bufsize = -1
  744.     
  745.     def __init__(self, methodName = 'runTest'):
  746.         SocketConnectedTest.__init__(self, methodName = methodName)
  747.  
  748.     
  749.     def setUp(self):
  750.         SocketConnectedTest.setUp(self)
  751.         self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
  752.  
  753.     
  754.     def tearDown(self):
  755.         self.serv_file.close()
  756.         self.assert_(self.serv_file.closed)
  757.         self.serv_file = None
  758.         SocketConnectedTest.tearDown(self)
  759.  
  760.     
  761.     def clientSetUp(self):
  762.         SocketConnectedTest.clientSetUp(self)
  763.         self.cli_file = self.serv_conn.makefile('wb')
  764.  
  765.     
  766.     def clientTearDown(self):
  767.         self.cli_file.close()
  768.         self.assert_(self.cli_file.closed)
  769.         self.cli_file = None
  770.         SocketConnectedTest.clientTearDown(self)
  771.  
  772.     
  773.     def testSmallRead(self):
  774.         first_seg = self.serv_file.read(len(MSG) - 3)
  775.         second_seg = self.serv_file.read(3)
  776.         msg = first_seg + second_seg
  777.         self.assertEqual(msg, MSG)
  778.  
  779.     
  780.     def _testSmallRead(self):
  781.         self.cli_file.write(MSG)
  782.         self.cli_file.flush()
  783.  
  784.     
  785.     def testFullRead(self):
  786.         msg = self.serv_file.read()
  787.         self.assertEqual(msg, MSG)
  788.  
  789.     
  790.     def _testFullRead(self):
  791.         self.cli_file.write(MSG)
  792.         self.cli_file.close()
  793.  
  794.     
  795.     def testUnbufferedRead(self):
  796.         buf = ''
  797.         while None:
  798.             char = self.serv_file.read(1)
  799.             if not char:
  800.                 break
  801.             
  802.             buf += char
  803.         self.assertEqual(buf, MSG)
  804.  
  805.     
  806.     def _testUnbufferedRead(self):
  807.         self.cli_file.write(MSG)
  808.         self.cli_file.flush()
  809.  
  810.     
  811.     def testReadline(self):
  812.         line = self.serv_file.readline()
  813.         self.assertEqual(line, MSG)
  814.  
  815.     
  816.     def _testReadline(self):
  817.         self.cli_file.write(MSG)
  818.         self.cli_file.flush()
  819.  
  820.     
  821.     def testClosedAttr(self):
  822.         self.assert_(not (self.serv_file.closed))
  823.  
  824.     
  825.     def _testClosedAttr(self):
  826.         self.assert_(not (self.cli_file.closed))
  827.  
  828.  
  829.  
  830. class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
  831.     """Repeat the tests from FileObjectClassTestCase with bufsize==0.
  832.  
  833.     In this case (and in this case only), it should be possible to
  834.     create a file object, read a line from it, create another file
  835.     object, read another line from it, without loss of data in the
  836.     first file object's buffer.  Note that httplib relies on this
  837.     when reading multiple requests from the same socket."""
  838.     bufsize = 0
  839.     
  840.     def testUnbufferedReadline(self):
  841.         line = self.serv_file.readline()
  842.         self.assertEqual(line, 'A. ' + MSG)
  843.         self.serv_file = self.cli_conn.makefile('rb', 0)
  844.         line = self.serv_file.readline()
  845.         self.assertEqual(line, 'B. ' + MSG)
  846.  
  847.     
  848.     def _testUnbufferedReadline(self):
  849.         self.cli_file.write('A. ' + MSG)
  850.         self.cli_file.write('B. ' + MSG)
  851.         self.cli_file.flush()
  852.  
  853.  
  854.  
  855. class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
  856.     bufsize = 1
  857.  
  858.  
  859. class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
  860.     bufsize = 2
  861.  
  862.  
  863. class TCPTimeoutTest(SocketTCPTest):
  864.     
  865.     def testTCPTimeout(self):
  866.         
  867.         def raise_timeout(*args, **kwargs):
  868.             self.serv.settimeout(1.0)
  869.             self.serv.accept()
  870.  
  871.         self.failUnlessRaises(socket.timeout, raise_timeout, 'Error generating a timeout exception (TCP)')
  872.  
  873.     
  874.     def testTimeoutZero(self):
  875.         ok = False
  876.         
  877.         try:
  878.             self.serv.settimeout(0.0)
  879.             foo = self.serv.accept()
  880.         except socket.timeout:
  881.             self.fail('caught timeout instead of error (TCP)')
  882.         except socket.error:
  883.             ok = True
  884.         except:
  885.             self.fail('caught unexpected exception (TCP)')
  886.  
  887.         if not ok:
  888.             self.fail('accept() returned success when we did not expect it')
  889.         
  890.  
  891.  
  892.  
  893. class UDPTimeoutTest(SocketTCPTest):
  894.     
  895.     def testUDPTimeout(self):
  896.         
  897.         def raise_timeout(*args, **kwargs):
  898.             self.serv.settimeout(1.0)
  899.             self.serv.recv(1024)
  900.  
  901.         self.failUnlessRaises(socket.timeout, raise_timeout, 'Error generating a timeout exception (UDP)')
  902.  
  903.     
  904.     def testTimeoutZero(self):
  905.         ok = False
  906.         
  907.         try:
  908.             self.serv.settimeout(0.0)
  909.             foo = self.serv.recv(1024)
  910.         except socket.timeout:
  911.             self.fail('caught timeout instead of error (UDP)')
  912.         except socket.error:
  913.             ok = True
  914.         except:
  915.             self.fail('caught unexpected exception (UDP)')
  916.  
  917.         if not ok:
  918.             self.fail('recv() returned success when we did not expect it')
  919.         
  920.  
  921.  
  922.  
  923. class TestExceptions(unittest.TestCase):
  924.     
  925.     def testExceptionTree(self):
  926.         self.assert_(issubclass(socket.error, Exception))
  927.         self.assert_(issubclass(socket.herror, socket.error))
  928.         self.assert_(issubclass(socket.gaierror, socket.error))
  929.         self.assert_(issubclass(socket.timeout, socket.error))
  930.  
  931.  
  932.  
  933. def test_main():
  934.     tests = [
  935.         GeneralModuleTests,
  936.         BasicTCPTest,
  937.         TCPTimeoutTest,
  938.         TestExceptions]
  939.     if sys.platform != 'mac':
  940.         tests.extend([
  941.             BasicUDPTest,
  942.             UDPTimeoutTest])
  943.     
  944.     tests.extend([
  945.         NonBlockingTCPTests,
  946.         FileObjectClassTestCase,
  947.         UnbufferedFileObjectClassTestCase,
  948.         LineBufferedFileObjectClassTestCase,
  949.         SmallBufferedFileObjectClassTestCase])
  950.     if hasattr(socket, 'socketpair'):
  951.         tests.append(BasicSocketPairTest)
  952.     
  953.     test_support.run_unittest(*tests)
  954.  
  955. if __name__ == '__main__':
  956.     test_main()
  957.  
  958.